# coding: UTF-8
from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext
import sys, datetime, time,json
import urllib
reload(sys)
sys.setdefaultencoding('utf-8')
conf = SparkConf().set("spark.sql.parquet.compression.codec", "snappy")
sc = SparkContext("yarn-client", "finance_base", conf=conf)
sqlContext = HiveContext(sc)

aaa=sqlContext.read.orc("/user/hive/warehouse/tmp.db/sms_log")
aaa.registerTempTable("base")

df=sqlContext.sql("""
    CREATE TABLE dw.xinyongka_20190126 as
    select
        *
    FROM
        dw.sms_log
    WHERE pt>='201805' and
        smslabel ='百度'
        AND (msg_content RLIKE '.*百度有钱花.*自动还款.*'
        OR msg_content RLIKE '.*自动还款.*'
        OR msg_content RLIKE '.*大额尊享.提醒您.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/1/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201806' and
        smslabel ='百度'
        AND (msg_content RLIKE '.*百度有钱花.*验证码.*'
        OR msg_content RLIKE '.*满易贷.*申请借款.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/2/" )




df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='你我贷'
        AND (msg_content RLIKE '.*成功充值.*'
        OR msg_content RLIKE '.*申请.*'
        OR msg_content RLIKE '.*还款金额为.*'
        OR msg_content RLIKE '.*成功扣款.*'
        OR msg_content RLIKE '.*审核通过.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/3/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='你我贷'
        AND (msg_content RLIKE '.*验证码.*'
        OR msg_content RLIKE '.*申请被拒.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/4/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富万卡'
        AND (msg_content RLIKE '.*成功借款.*'
        OR msg_content RLIKE '.*已成功激活.*'
        OR msg_content RLIKE '.*恭喜您完成授信任务.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/5/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富万卡'
        AND msg_content RLIKE '.*验证码.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/6/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富叮当'
        AND (msg_content RLIKE '.*两小时内到账.*'
        OR msg_content RLIKE '.*本期应还总额.*'
         OR msg_content RLIKE '.*已审核通过.*'
         OR msg_content RLIKE '.*还款日.*'
         OR msg_content RLIKE '.*本次还款金额.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/7/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富叮当'
        AND msg_content RLIKE '.*验证码.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/8/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富叮当'
        AND msg_content RLIKE '.*本次还款金额.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/9/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富叮当'
        AND msg_content RLIKE '.*玖富叮当.*您已严重逾期.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/10/" )




df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='拍拍贷'
        AND msg_content RLIKE '.*拍拍贷.*金额.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/11/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='翼龙贷'
        AND (msg_content RLIKE '.*翼龙贷.*成功扣款.*' OR msg_content RLIKE '.*翼龙贷.*还款本金.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/12/" )

df=sqlContext.sql("""
    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='金瑞龙'
        AND msg_content RLIKE '.*验证码.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/13/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='益秒到'
        AND (msg_content RLIKE '.*益秒到.*您的.*' OR msg_content RLIKE '.*益秒到.*已通过额度宝系统审核.*')
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/14/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='益秒到'
        AND msg_content RLIKE '.*验证码.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/15/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='易美健'
        AND msg_content RLIKE '.*本期金额.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/16/" )



df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='易美健'
        AND msg_content RLIKE '.*验证码.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/17/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='钱到到'
        AND msg_content RLIKE '.*钱到到.*您的.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/18/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='钱到到'
        AND msg_content RLIKE '.*钱到到.*本月账单.*'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/19/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='西南证券'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/20/" )


df=sqlContext.sql("""

    select
        DISTINCT  msisdn
    FROM
        base
    WHERE pt='201712' and
        smslabel ='玖富证券'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/21/" )

aaa=sqlContext.read.orc("/user/hive/warehouse/tmp.db/sms_log")
aaa.registerTempTable("base")
df=sqlContext.sql("""

    select
        *
    FROM
        base
    WHERE pt>'201805' and
        smslabel ='兴业证券'
""")
df.repartition(1).write.mode("overwrite").orc("/user/hive/warehouse/tmp.db/zhengquan_20181022/" )

aaa=sqlContext.read.orc("/user/hive/warehouse/tmp.db/zhengquan_20181022/*")
aaa.registerTempTable("aaa")
df=sqlContext.sql("""
    select msisdn FROM
      aaa ORDER BY insert_time desc
""")